
import BaseHTTPServer
import cgi
import functools
import json
import math
import random
import socket
import SocketServer
import time
import threading
import unittest
import uuid
import urlparse

import redis

def acquire_lock_with_timeout(
    conn, lockname, acquire_timeout=10, lock_timeout=10):
    identifier = str(uuid.uuid4())                      #A
    lockname = 'lock:' + lockname
    lock_timeout = int(math.ceil(lock_timeout))         #D

    end = time.time() + acquire_timeout
    while time.time() < end:
        if conn.setnx(lockname, identifier):            #B
            conn.expire(lockname, lock_timeout)         #B
            return identifier
        elif not conn.ttl(lockname):                    #C
            conn.expire(lockname, lock_timeout)         #C

        time.sleep(.001)

    return False

def release_lock(conn, lockname, identifier):
    pipe = conn.pipeline(True)
    lockname = 'lock:' + lockname

    while True:
        try:
            pipe.watch(lockname)                  #A
            if pipe.get(lockname) == identifier:  #A
                pipe.multi()                      #B
                pipe.delete(lockname)             #B
                pipe.execute()                    #B
                return True                       #B

            pipe.unwatch()
            break

        except redis.exceptions.WatchError:       #C
            pass                                  #C

    return False                                  #D

CONFIGS = {}
CHECKED = {}

def get_config(conn, type, component, wait=1):
    key = 'config:%s:%s'%(type, component)

    if CHECKED.get(key) < time.time() - wait:           #A
        CHECKED[key] = time.time()                      #B
        config = json.loads(conn.get(key) or '{}')      #C
        old_config = CONFIGS.get(key)                   #D

        if config != old_config:                        #E
            CONFIGS[key] = config                       #F

    return CONFIGS.get(key)

REDIS_CONNECTIONS = {}

def redis_connection(component, wait=1):                        #A
    key = 'config:redis:' + component                           #B
    def wrapper(function):                                      #C
        @functools.wraps(function)                              #D
        def call(*args, **kwargs):                              #E
            old_config = CONFIGS.get(key, object())             #F
            _config = get_config(                               #G
                config_connection, 'redis', component, wait)    #G

            config = {}
            for k, v in _config.iteritems():                    #L
                config[k.encode('utf-8')] = v                   #L

            if config != old_config:                            #H
                REDIS_CONNECTIONS[key] = redis.Redis(**config)  #H

            return function(                                    #I
                REDIS_CONNECTIONS.get(key), *args, **kwargs)    #I
        return call                                             #J
    return wrapper                                              #K

def execute_later(conn, queue, name, args):
    # this is just for testing purposes
    assert conn is args[0]
    t = threading.Thread(target=globals()[name], args=tuple(args))
    t.setDaemon(1)
    t.start()

# <start id="create-twitter-user"/>
def create_user(conn, login, name):
    llogin = login.lower()
    lock = acquire_lock_with_timeout(conn, 'user:' + llogin, 1) #A
    if not lock:                            #B
        return None                         #B

    if conn.hget('users:', llogin):         #C
        release_lock(conn, 'user:' + llogin, lock)  #C
        return None                         #C

    id = conn.incr('user:id:')              #D
    pipeline = conn.pipeline(True)
    pipeline.hset('users:', llogin, id)     #E
    pipeline.hmset('user:%s'%id, {          #F
        'login': login,                     #F
        'id': id,                           #F
        'name': name,                       #F
        'followers': 0,                     #F
        'following': 0,                     #F
        'posts': 0,                         #F
        'signup': time.time(),              #F
    })
    pipeline.execute()
    release_lock(conn, 'user:' + llogin, lock)  #G
    return id                               #H
# <end id="create-twitter-user"/>
#A Try to acquire the lock for the lowercased version of the login name. This function is defined in chapter 6
#B If we couldn't get the lock, then someone else already has the same login name
#C We also store a HASH of lowercased login names to user ids, so if there is already a login name that maps to an ID, we know and won't give it to a second person
#D Each user is given a unique id, generated by incrementing a counter
#E Add the lowercased login name to the HASH that maps from login names to user ids
#F Add the user information to the user's HASH
#G Release the lock over the login name
#H Return the id of the user
#END

# <start id="create-twitter-status"/>
def create_status(conn, uid, message, **data):
    pipeline = conn.pipeline(True)
    pipeline.hget('user:%s'%uid, 'login')   #A
    pipeline.incr('status:id:')             #B
    login, id = pipeline.execute()

    if not login:                           #C
        return None                         #C

    data.update({
        'message': message,                 #D
        'posted': time.time(),              #D
        'id': id,                           #D
        'uid': uid,                         #D
        'login': login,                     #D
    })
    pipeline.hmset('status:%s'%id, data)    #D
    pipeline.hincrby('user:%s'%uid, 'posts')#E
    pipeline.execute()
    return id                               #F
# <end id="create-twitter-status"/>
#A Get the user's login name from their user id
#B Create a new id for the status message
#C Verify that we have a proper user account before posting
#D Prepare and set the data for the status message
#E Record the fact that a status message has been posted
#F Return the id of the newly created status message
#END

# <start id="fetch-page"/>
def get_status_messages(conn, uid, timeline='home:', page=1, count=30):#A
    statuses = conn.zrevrange(                                  #B
        '%s%s'%(timeline, uid), (page-1)*count, page*count-1)   #B

    pipeline = conn.pipeline(True)
    for id in statuses:                                         #C
        pipeline.hgetall('status:%s'%id)                        #C

    return filter(None, pipeline.execute())                     #D
# <end id="fetch-page"/>
#A We will take an optional 'timeline' argument, as well as page size and status message counts
#B Fetch the most recent status ids in the timeline
#C Actually fetch the status messages themselves
#D Filter will remove any 'missing' status messages that had been previously deleted
#END

# <start id="follow-user"/>
HOME_TIMELINE_SIZE = 1000
def follow_user(conn, uid, other_uid):
    fkey1 = 'following:%s'%uid          #A
    fkey2 = 'followers:%s'%other_uid    #A

    if conn.zscore(fkey1, other_uid):   #B
        return None                     #B

    now = time.time()

    pipeline = conn.pipeline(True)
    pipeline.zadd(fkey1, other_uid, now)    #C
    pipeline.zadd(fkey2, uid, now)          #C
    pipeline.zrevrange('profile:%s'%other_uid,      #E
        0, HOME_TIMELINE_SIZE-1, withscores=True)   #E
    following, followers, status_and_score = pipeline.execute()[-3:]

    pipeline.hincrby('user:%s'%uid, 'following', int(following))        #F
    pipeline.hincrby('user:%s'%other_uid, 'followers', int(followers))  #F
    if status_and_score:
        pipeline.zadd('home:%s'%uid, **dict(status_and_score))  #G
    pipeline.zremrangebyrank('home:%s'%uid, 0, -HOME_TIMELINE_SIZE-1)#G

    pipeline.execute()
    return True                         #H
# <end id="follow-user"/>
#A Cache the following and followers key names
#B If the other_uid is already being followed, return
#C Add the uids to the proper following and followers ZSETs
#E Fetch the most recent HOME_TIMELINE_SIZE status messages from the newly followed user's profile timeline
#F Update the known size of the following and followers ZSETs in each user's HASH
#G Update the home timeline of the following user, keeping only the most recent 1000 status messages
#H Return that the user was correctly followed
#END

# <start id="unfollow-user"/>
def unfollow_user(conn, uid, other_uid):
    fkey1 = 'following:%s'%uid          #A
    fkey2 = 'followers:%s'%other_uid    #A

    if not conn.zscore(fkey1, other_uid):   #B
        return None                         #B

    pipeline = conn.pipeline(True)
    pipeline.zrem(fkey1, other_uid)                 #C
    pipeline.zrem(fkey2, uid)                       #C
    pipeline.zrevrange('profile:%s'%other_uid,      #E
        0, HOME_TIMELINE_SIZE-1)                    #E
    following, followers, statuses = pipeline.execute()[-3:]

    pipeline.hincrby('user:%s'%uid, 'following', int(following))        #F
    pipeline.hincrby('user:%s'%other_uid, 'followers', int(followers))  #F
    if statuses:
        pipeline.zrem('home:%s'%uid, *statuses)                 #G

    pipeline.execute()
    return True                         #H
# <end id="unfollow-user"/>
#A Cache the following and followers key names
#B If the other_uid is not being followed, return
#C Remove the uids the proper following and followers ZSETs
#E Fetch the most recent HOME_TIMELINE_SIZE status messages from the user that we stopped following
#F Update the known size of the following and followers ZSETs in each user's HASH
#G Update the home timeline, removing any status messages from the previously followed user
#H Return that the unfollow executed successfully
#END

# <start id="exercise-refilling-timelines"/>
REFILL_USERS_STEP = 50
def refill_timeline(conn, incoming, timeline, start=0):
    if not start and conn.zcard(timeline) >= 750:               #A
        return                                                  #A

    users = conn.zrangebyscore(incoming, start, 'inf',          #B
        start=0, num=REFILL_USERS_STEP, withscores=True)        #B

    pipeline = conn.pipeline(False)
    for uid, start in users:
        pipeline.zrevrange('profile:%s'%uid,                    #C
            0, HOME_TIMELINE_SIZE-1, withscores=True)           #C

    messages = []
    for results in pipeline.execute():
        messages.extend(results)                            #D

    messages.sort(key=lambda x:-x[1])                       #E
    del messages[HOME_TIMELINE_SIZE:]                       #E

    pipeline = conn.pipeline(True)
    if messages:
        pipeline.zadd(timeline, **dict(messages))           #F
    pipeline.zremrangebyrank(                               #G
        timeline, 0, -HOME_TIMELINE_SIZE-1)                 #G
    pipeline.execute()

    if len(users) >= REFILL_USERS_STEP:
        execute_later(conn, 'default', 'refill_timeline',       #H
            [conn, incoming, timeline, start])                  #H
# <end id="exercise-refilling-timelines"/>
#A If the timeline is 3/4 of the way full already, don't bother refilling it
#B Fetch a group of users that should contribute to this timeline
#C Fetch the most recent status messages from the users followed
#D Group all of the fetched status messages together
#E Sort all of the status messages by how recently they were posted, and keep the most recent 1000
#F Add all of the fetched status messages to the user's home timeline
#G Remove any messages that are older than the most recent 1000
#H If there are still more users left to fetch from, keep going
#END

# <start id="exercise-follow-user-list"/>
def follow_user_list(conn, uid, other_uid, list_id):
    fkey1 = 'list:in:%s'%list_id            #A
    fkey2 = 'list:out:%s'%other_uid         #A
    timeline = 'list:statuses:%s'%list_id   #A

    if conn.zscore(fkey1, other_uid):   #B
        return None                     #B

    now = time.time()

    pipeline = conn.pipeline(True)
    pipeline.zadd(fkey1, other_uid, now)        #C
    pipeline.zadd(fkey2, list_id, now)          #C
    pipeline.zcard(fkey1)                       #D
    pipeline.zrevrange('profile:%s'%other_uid,      #E
        0, HOME_TIMELINE_SIZE-1, withscores=True)   #E
    following, status_and_score = pipeline.execute()[-2:]

    pipeline.hset('list:%s'%list_id, 'following', following)    #F
    pipeline.zadd(timeline, **dict(status_and_score))           #G
    pipeline.zremrangebyrank(timeline, 0, -HOME_TIMELINE_SIZE-1)#G

    pipeline.execute()
    return True                         #H
# <end id="exercise-follow-user"/>
#A Cache the key names
#B If the other_uid is already being followed by the list, return
#C Add the uids to the proper ZSETs
#D Find the size of the list ZSET
#E Fetch the most recent status messages from the user's profile timeline
#F Update the known size of the list ZSETs in the list information HASH
#G Update the list of status messages
#H Return that adding the user to the list completed successfully
#END

# <start id="exercise-unfollow-user-list"/>
def unfollow_user_list(conn, uid, other_uid, list_id):
    fkey1 = 'list:in:%s'%list_id            #A
    fkey2 = 'list:out:%s'%other_uid         #A
    timeline = 'list:statuses:%s'%list_id   #A

    if not conn.zscore(fkey1, other_uid):   #B
        return None                         #B

    pipeline = conn.pipeline(True)
    pipeline.zrem(fkey1, other_uid)                 #C
    pipeline.zrem(fkey2, list_id)                   #C
    pipeline.zcard(fkey1)                           #D
    pipeline.zrevrange('profile:%s'%other_uid,      #E
        0, HOME_TIMELINE_SIZE-1)                    #E
    following, statuses = pipeline.execute()[-2:]

    pipeline.hset('list:%s'%list_id, 'following', following)    #F
    if statuses:
        pipeline.zrem(timeline, *statuses)                      #G
        refill_timeline(fkey1, timeline)                        #H

    pipeline.execute()
    return True                         #I
# <end id="exercise-unfollow-user-list"/>
#A Cache the key names
#B If the other_uid is not being followed by the list, return
#C Remove the uids from the proper ZSETs
#D Find the size of the list ZSET
#E Fetch the most recent status messages from the user that we stopped following
#F Update the known size of the list ZSETs in the list information HASH
#G Update the list timeline, removing any status messages from the previously followed user
#H Start refilling the list timeline
#I Return that the unfollow executed successfully
#END

# <start id="exercise-create-user-list"/>
def create_user_list(conn, uid, name):
    pipeline = conn.pipeline(True)
    pipeline.hget('user:%s'%uid, 'login')   #A
    pipeline.incr('list:id:')               #B
    login, id = pipeline.execute()

    if not login:               #C
        return None             #C

    now = time.time()

    pipeline = conn.pipeline(True)
    pipeline.zadd('lists:%s'%uid, **{id: now})  #D
    pipeline.hmset('list:%s'%id, {              #E
        'name': name,                           #E
        'id': id,                               #E
        'uid': uid,                             #E
        'login': login,                         #E
        'following': 0,                         #E
        'created': now,                         #E
    })
    pipeline.execute()

    return id           #F
# <end id="exercise-create-user-list"/>
#A Fetch the login name of the user who is creating the list
#B Generate a new list id
#C If the user doesn't exist, return
#D Add the new list to a ZSET of lists that the user has created
#E Create the list information HASH
#F Return the new list id
#END

# <start id="post-message"/>
def post_status(conn, uid, message, **data):
    id = create_status(conn, uid, message, **data)  #A
    if not id:              #B
        return None         #B

    posted = conn.hget('status:%s'%id, 'posted')    #C
    if not posted:                                  #D
        return None                                 #D

    post = {str(id): float(posted)}
    conn.zadd('profile:%s'%uid, **post)             #E

    syndicate_status(conn, uid, post)       #F
    return id
# <end id="post-message"/>
#A Create a status message using the earlier function
#B If the creation failed, return
#C Get the time that the message was posted
#D If the post wasn't found, return
#E Add the status message to the user's profile timeline
#F Actually push the status message out to the followers of the user
#END

# <start id="syndicate-message"/>
POSTS_PER_PASS = 1000           #A
def syndicate_status(conn, uid, post, start=0):
    followers = conn.zrangebyscore('followers:%s'%uid, start, 'inf',#B
        start=0, num=POSTS_PER_PASS, withscores=True)   #B

    pipeline = conn.pipeline(False)
    for follower, start in followers:                    #E
        pipeline.zadd('home:%s'%follower, **post)        #C
        pipeline.zremrangebyrank(                        #C
            'home:%s'%follower, 0, -HOME_TIMELINE_SIZE-1)#C
    pipeline.execute()

    if len(followers) >= POSTS_PER_PASS:                    #D
        execute_later(conn, 'default', 'syndicate_status',  #D
            [conn, uid, post, start])                       #D
# <end id="syndicate-message"/>
#A Only send to 1000 users per pass
#B Fetch the next group of 1000 followers, starting at the last person to be updated last time
#E Iterating through the followers results will update the 'start' variable, which we can later pass on to subsequent syndicate_status() calls
#C Add the status to the home timelines of all of the fetched followers, and trim the home timelines so they don't get too big
#D If at least 1000 followers had received an update, execute the remaining updates in a task
#END

# <start id="syndicate-message-list"/>
def syndicate_status_list(conn, uid, post, start=0, on_lists=False):
    key = 'followers:%s'%uid            #A
    base = 'home:%s'                    #A
    if on_lists:                        #A
        key = 'list:out:%s'%uid         #A
        base = 'list:statuses:%s'       #A
    followers = conn.zrangebyscore(key, start, 'inf',   #B
        start=0, num=POSTS_PER_PASS, withscores=True)   #B

    pipeline = conn.pipeline(False)
    for follower, start in followers:                   #C
        pipeline.zadd(base%follower, **post)            #C
        pipeline.zremrangebyrank(                       #C
            base%follower, 0, -HOME_TIMELINE_SIZE-1)    #C
    pipeline.execute()

    if len(followers) >= POSTS_PER_PASS:                    #D
        execute_later(conn, 'default', 'syndicate_status',  #D
            [conn, uid, post, start, on_lists])             #D

    elif not on_lists:
        execute_later(conn, 'default', 'syndicate_status',  #E
            [conn, uid, post, 0, True])                     #E
# <end id="syndicate-message-list"/>
#A Use keys for home timelines or list timelines, depending on how far along we are
#B Fetch the next group of 1000 followers or lists, starting at the last user or list to be updated last time
#C Add the status to the home timelines of all of the fetched followers, and trim the home timelines so they don't get too big
#D If at least 1000 followers had received an update, execute the remaining updates in a task
#E Start executing over lists if we haven't executed over lists yet, but we are done with home timelines
#END

# <start id="delete-message"/>
def delete_status(conn, uid, status_id):
    key = 'status:%s'%status_id
    lock = acquire_lock_with_timeout(conn, key, 1)  #A
    if not lock:                #B
        return None             #B

    if conn.hget(key, 'uid') != str(uid):   #C
        release_lock(conn, key, lock)       #C
        return None                         #C

    pipeline = conn.pipeline(True)
    pipeline.delete(key)                            #D
    pipeline.zrem('profile:%s'%uid, status_id)      #E
    pipeline.zrem('home:%s'%uid, status_id)         #F
    pipeline.hincrby('user:%s'%uid, 'posts', -1)    #G
    pipeline.execute()

    release_lock(conn, key, lock)
    return True
# <end id="delete-message"/>
#A Acquire a lock around the status object to ensure that no one else is trying to delete it when we are
#B If we didn't get the lock, return
#C If the user doesn't match the user stored in the status message, return
#D Delete the status message
#E Remove the status message id from the user's profile timeline
#F Remove the status message id from the user's home timeline
#G Reduce the number of posted messages in the user information HASH
#END

# <start id="exercise-clean-out-timelines"/>
def clean_timelines(conn, uid, status_id, start=0, on_lists=False):
    key = 'followers:%s'%uid            #A
    base = 'home:%s'                    #A
    if on_lists:                        #A
        key = 'list:out:%s'%uid         #A
        base = 'list:statuses:%s'       #A
    followers = conn.zrangebyscore(key, start, 'inf',   #B
        start=0, num=POSTS_PER_PASS, withscores=True)   #B

    pipeline = conn.pipeline(False)
    for follower, start in followers:                    #C
        pipeline.zrem(base%follower, status_id)          #C
    pipeline.execute()

    if len(followers) >= POSTS_PER_PASS:                    #D
        execute_later(conn, 'default', 'clean_timelines' ,  #D
            [conn, uid, status_id, start, on_lists])        #D

    elif not on_lists:
        execute_later(conn, 'default', 'clean_timelines',   #E
            [conn, uid, status_id, 0, True])                #E
# <end id="exercise-clean-out-timelines"/>
#A Use keys for home timelines or list timelines, depending on how far along we are
#B Fetch the next group of 1000 followers or lists, starting at the last user or list to be updated last time
#C Remove the status from the home timelines of all of the fetched followers
#D If at least 1000 followers had received an update, execute the remaining updates in a task
#E Start executing over lists if we haven't executed over lists yet, but we are done with home timelines
#END

# <start id="streaming-http-server"/>
class StreamingAPIServer(               #A
    SocketServer.ThreadingMixIn,        #B
    BaseHTTPServer.HTTPServer):         #B

    daemon_threads = True               #C

class StreamingAPIRequestHandler(               #D
    BaseHTTPServer.BaseHTTPRequestHandler):     #E

    def do_GET(self):                                       #F
        parse_identifier(self)                              #G
        if self.path != '/statuses/sample.json':            #H
            return self.send_error(404)                     #H

        process_filters(self)                               #I

    def do_POST(self):                                      #J
        parse_identifier(self)                              #K
        if self.path != '/statuses/filter.json':            #L
            return self.send_error(404)                     #L

        process_filters(self)                               #M
# <end id="streaming-http-server"/>
#A Create a new class called 'StreamingAPIServer'
#B This new class should have the ability to create new threads with each request, and should be a HTTPServer
#C Tell the internals of the threading server to shut down all client request threads if the main server thread dies
#D Create a new class called 'StreamingAPIRequestHandler'
#E This new class should be able to handle HTTP requests
#F Create a method that is called do_GET(), which will be executed on GET requests performed against this server
#G Call a helper function that handles the fetching of an identifier for the client
#H If the request is not a 'sample' or 'firehose' streaming GET request, return a '404 not found' error
#I Otherwise, call a helper function that actually handles the filtering
#J Create a method that is called do_POST(), which will be executed on POST requests performed against this server
#K Call a helper function that handles the fetching of an identifier for the client
#L If the request is not a user, keyword, or location filter, return a '404 not found' error
#M Otherwise, call a helper function that actually handles the filtering
#END

# <start id="get-identifier"/>
def parse_identifier(handler):
    handler.identifier = None       #A
    handler.query = {}              #A
    if '?' in handler.path:         #B
        handler.path, _, query = handler.path.partition('?')    #C
        handler.query = urlparse.parse_qs(query)                #D
        identifier = handler.query.get('identifier') or [None]  #E
        handler.identifier = identifier[0]                      #F
# <end id="get-identifier"/>
#A Set the identifier and query arguments to be palceholder values
#B If there were query arguments as part of the request, process them
#C Extract the query portion from the path, and update the path
#D Parse the query
#E Fetch the list of query arguments with the name 'identifier'
#F Use the first identifier passed
#END

# <start id="stream-to-client"/>
FILTERS = ('track', 'filter', 'location')                   #A
def process_filters(handler):
    id = handler.identifier
    if not id:                                              #B
        return handler.send_error(401, "identifier missing")#B

    method = handler.path.rsplit('/')[-1].split('.')[0]     #C
    name = None
    args = None
    if method == 'filter':                                  #D
        data = cgi.FieldStorage(                                #E
            fp=handler.rfile,                                   #E
            headers=handler.headers,                            #E
            environ={'REQUEST_METHOD':'POST',                   #E
                     'CONTENT_TYPE':handler.headers['Content-Type'],#E
        })

        for name in data:                               #F
            if name in FILTERS:                         #F
                args = data.getfirst(name).lower().split(',')   #F
                break                                   #F

        if not args:                                            #G
            return handler.send_error(401, "no filter provided")#G
    else:
        args = handler.query                                #M

    handler.send_response(200)                              #H
    handler.send_header('Transfer-Encoding', 'chunked')     #H
    handler.end_headers()

    quit = [False]                                          #N
    for item in filter_content(id, method, name, args, quit):   #I
        try:
            handler.wfile.write('%X\r\n%s\r\n'%(len(item), item))   #J
        except socket.error:                                    #K
            quit[0] = True                                      #K
    if not quit[0]:
        handler.wfile.write('0\r\n\r\n')                        #L
# <end id="stream-to-client"/>
#A Keep a listing of filters that need arguments
#B Return an error if an identifier was not provided by the client
#C Fetch the method, should be one of 'sample' or 'filter'
#D If this is a filtering method, we need to fetch the arguments
#E Parse the POST request to discover the type and arguments to the filter
#F Fetch any of the filters provided by the client request
#G If there were no filters specified, return an error
#M For sample requests, pass the query arguments as the 'args'
#H Finally return a response to the client, informing them that they will be receiving a streaming response
#N Use a Python list as a holder for a pass-by-reference variable, which will allow us to tell the content filter to stop receiving messages
#I Iterate over the results of the filter
#J Send the pre-encoded response to the client using the chunked encoding
#K If sending to the client caused an error, then we need to tell the subscriber to unsubscribe and shut down
#L Send the "end of chunks" message to the client if we haven't already disconnected
#END

_create_status = create_status
# <start id="create-message-streaming"/>
def create_status(conn, uid, message, **data):
    pipeline = conn.pipeline(True)
    pipeline.hget('user:%s'%uid, 'login')
    pipeline.incr('status:id:')
    login, id = pipeline.execute()

    if not login:
        return None

    data.update({
        'message': message,
        'posted': time.time(),
        'id': id,
        'uid': uid,
        'login': login,
    })
    pipeline.hmset('status:%s'%id, data)
    pipeline.hincrby('user:%s'%uid, 'posts')
    pipeline.publish('streaming:status:', json.dumps(data)) #A
    pipeline.execute()
    return id
# <end id="create-message-streaming"/>
#A The added line to send a message to streaming filters
#END

_delete_status = delete_status
# <start id="delete-message-streaming"/>
def delete_status(conn, uid, status_id):
    key = 'status:%s'%status_id
    lock = acquire_lock_with_timeout(conn, key, 1)
    if not lock:
        return None

    if conn.hget(key, 'uid') != str(uid):
        release_lock(conn, key, lock)
        return None

    pipeline = conn.pipeline(True)
    status = conn.hgetall(key)                                  #A
    status['deleted'] = True                                    #B
    pipeline.publish('streaming:status:', json.dumps(status))   #C
    pipeline.delete(key)
    pipeline.zrem('profile:%s'%uid, status_id)
    pipeline.zrem('home:%s'%uid, status_id)
    pipeline.hincrby('user:%s'%uid, 'posts', -1)
    pipeline.execute()

    release_lock(conn, key, lock)
    return True
# <end id="delete-message-streaming"/>
#A Fetch the status message so that streaming filters can perform the same filters to determine whether the deletion should be passed to the client
#B Mark the status message as deleted
#C Publish the deleted status message to the stream
#END

# <start id="message-subscription"/>
@redis_connection('social-network')                         #A
def filter_content(conn, id, method, name, args, quit):
    match = create_filters(id, method, name, args)          #B

    pubsub = conn.pubsub()                      #C
    pubsub.subscribe(['streaming:status:'])     #C

    for item in pubsub.listen():                #D
        message = item['data']                  #E
        decoded = json.loads(message)           #E

        if match(decoded):                      #F
            if decoded.get('deleted'):                      #G
                yield json.dumps({                          #G
                    'id': decoded['id'], 'deleted': True})  #G
            else:
                yield message                   #H

        if quit[0]:                             #I
            break                               #I

    pubsub.reset()                              #J
# <end id="message-subscription"/>
#A Use our automatic connection decorator from chapter 5
#B Create the filter that will determine whether a message should be sent to the client
#C Prepare the subscription
#D Receive messages from the subscription
#E Get the status message information from the subscription structure
#F Check if the status message matched the filter
#G For deleted messages, send a special 'deleted' placeholder for the message
#H For matched status messages that are not deleted, send the message itself
#I If the web server no longer has a connection to the client, stop filtering messages
#J Reset the Redis connection to ensure that the Redis server clears its outgoing buffers if this wasn't fast enough
#END

# <start id="create-filters"/>
def create_filters(id, method, name, args):
    if method == 'sample':                      #A
        return SampleFilter(id, args)           #A
    elif name == 'track':                       #B
        return TrackFilter(args)                #B
    elif name == 'follow':                      #B
        return FollowFilter(args)               #B
    elif name == 'location':                    #B
        return LocationFilter(args)             #B
    raise Exception("Unknown filter")           #C
# <end id="create-filters"/>
#A For the 'sample' method, we don't need to worry about names, just the arguments
#B For the 'filter' method, we actually worry about which of the filters we want to apply, so return the specific filters for them
#C If no filter matches, then raise an exception
#END

# <start id="sample-filter"/>
def SampleFilter(id, args):                             #A
    percent = int(args.get('percent', ['10'])[0], 10)   #B
    ids = range(100)                                    #C
    shuffler = random.Random(id)                        #C
    shuffler.shuffle(ids)                               #C
    keep = set(ids[:max(percent, 1)])                   #D

    def check(status):                                  #E
        return (status['id'] % 100) in keep             #F
    return check
# <end id="sample-filter"/>
#A We are defining a filter class called "SampleFilter", which are created by passing 'id' and 'args' parameters
#B The 'args' parameter is actually a dictionary, based on the parameters passed as part of the GET request
#C We use the 'id' parameter to randomly choose a subset of ids, the count of which is determined by the 'percent' argument passed
#D We will use a Python set to allow us to quickly determine whether a status message matches our criteria
#E If we create a specially named method called '__call__' on an instance, it will be called if the instance is used like a function
#F To filter status messages, we fetch the status id, find its value modulo 100, and return whether it is in the status ids that we want to accept
#END

# <start id="track-filter"/>
def TrackFilter(list_of_strings):
    groups = []                                 #A
    for group in list_of_strings:               #A
        group = set(group.lower().split())      #A
        if group:
            groups.append(group)                #B

    def check(status):
        message_words = set(status['message'].lower().split())  #C
        for group in groups:                                #D
            if len(group & message_words) == len(group):    #E
                return True                                 #E
        return False
    return check
# <end id="track-filter"/>
#A The filter should have been provided with a list of word groups, and the filter matches if a message has all of the words in any of the groups
#B We will only keep groups that have at least 1 word
#C We are going to split words in the message on whitespace
#D Then we are going to iterate over all of the groups
#E If all of the words in any of the groups match, we will accept the message with this filter
#END

# <start id="follow-filter"/>
def FollowFilter(names):
    nset = set()                                    #A
    for name in names:                              #B
        nset.add('@' + name.lower().lstrip('@'))    #B

    def check(status):
        message_words = set(status['message'].lower().split())  #C
        message_words.add('@' + status['login'].lower())        #C

        return message_words & nset                             #D
    return check
# <end id="follow-filter"/>
#A We are going to try to match login names against posters and messages
#B Make all of the names consistently stored as '@username'
#C Construct a set of words from the message and the poster's name
#D Consider the message a match if any of the usernames provided match any of the whitespace-separated words in the message
#END

# <start id="location-filter"/>
def LocationFilter(list_of_boxes):
    boxes = []                                                  #A
    for start in xrange(0, len(list_of_boxes)-3, 4):            #A
        boxes.append(map(float, list_of_boxes[start:start+4]))  #A

    def check(self, status):
        location = status.get('location')           #B
        if not location:                            #C
            return False                            #C

        lat, lon = map(float, location.split(','))  #D
        for box in self.boxes:                      #E
            if (box[1] <= lat <= box[3] and         #F
                box[0] <= lon <= box[2]):           #F
                return True                         #F
        return False
    return check
# <end id="location-filter"/>
#A We are going to create a set of boxes that define the regions that should return messages
#B Try to fetch 'location' data from a status message
#C If the message has no location information, then it can't be inside the boxes
#D Otherwise, extract the latitude and longitude of the location
#E To match one of the boxes, we need to iterate over all boxes
#F If the message status location is within the required latitude and longitude range, then the status message matches the filter
#END

_filter_content = filter_content
def filter_content(identifier, method, name, args, quit):
    print "got:", identifier, method, name, args
    for i in xrange(10):
        yield json.dumps({'id':i})
        if quit[0]:
            break
        time.sleep(.1)
'''
# <start id="start-http-server"/>
if __name__ == '__main__':                  #A
    server = StreamingAPIServer(                        #B
        ('localhost', 8080), StreamingAPIRequestHandler)#B
    print 'Starting server, use <Ctrl-C> to stop'       #C
    server.serve_forever()                  #D
# <end id="start-http-server"/>
#A Run the below block of code if this module is being run from the command line
#B Create an insteance of the streaming API server listening on localhost port 8080, and use the StreamingAPIRequestHandler to process requests
#C Print an informational line
#D Run the server until someone kills it
#END
'''

class TestCh08(unittest.TestCase):
    def setUp(self):
        self.conn = redis.Redis(db=15)
        self.conn.flushdb()
    def tearDown(self):
        self.conn.flushdb()

    def test_create_user_and_status(self):
        self.assertEquals(create_user(self.conn, 'TestUser', 'Test User'), 1)
        self.assertEquals(create_user(self.conn, 'TestUser', 'Test User2'), None)

        self.assertEquals(create_status(self.conn, 1, "This is a new status message"), 1)
        self.assertEquals(self.conn.hget('user:1', 'posts'), '1')

    def test_follow_unfollow_user(self):
        self.assertEquals(create_user(self.conn, 'TestUser', 'Test User'), 1)
        self.assertEquals(create_user(self.conn, 'TestUser2', 'Test User2'), 2)
        
        self.assertTrue(follow_user(self.conn, 1, 2))
        self.assertEquals(self.conn.zcard('followers:2'), 1)
        self.assertEquals(self.conn.zcard('followers:1'), 0)
        self.assertEquals(self.conn.zcard('following:1'), 1)
        self.assertEquals(self.conn.zcard('following:2'), 0)
        self.assertEquals(self.conn.hget('user:1', 'following'), '1')
        self.assertEquals(self.conn.hget('user:2', 'following'), '0')
        self.assertEquals(self.conn.hget('user:1', 'followers'), '0')
        self.assertEquals(self.conn.hget('user:2', 'followers'), '1')

        self.assertEquals(unfollow_user(self.conn, 2, 1), None)
        self.assertEquals(unfollow_user(self.conn, 1, 2), True)
        self.assertEquals(self.conn.zcard('followers:2'), 0)
        self.assertEquals(self.conn.zcard('followers:1'), 0)
        self.assertEquals(self.conn.zcard('following:1'), 0)
        self.assertEquals(self.conn.zcard('following:2'), 0)
        self.assertEquals(self.conn.hget('user:1', 'following'), '0')
        self.assertEquals(self.conn.hget('user:2', 'following'), '0')
        self.assertEquals(self.conn.hget('user:1', 'followers'), '0')
        self.assertEquals(self.conn.hget('user:2', 'followers'), '0')
        
    def test_syndicate_status(self):
        self.assertEquals(create_user(self.conn, 'TestUser', 'Test User'), 1)
        self.assertEquals(create_user(self.conn, 'TestUser2', 'Test User2'), 2)
        self.assertTrue(follow_user(self.conn, 1, 2))
        self.assertEquals(self.conn.zcard('followers:2'), 1)
        self.assertEquals(self.conn.hget('user:1', 'following'), '1')
        self.assertEquals(post_status(self.conn, 2, 'this is some message content'), 1)
        self.assertEquals(len(get_status_messages(self.conn, 1)), 1)

        for i in xrange(3, 11):
            self.assertEquals(create_user(self.conn, 'TestUser%s'%i, 'Test User%s'%i), i)
            follow_user(self.conn, i, 2)

        global POSTS_PER_PASS
        POSTS_PER_PASS = 5
        
        self.assertEquals(post_status(self.conn, 2, 'this is some other message content'), 2)
        time.sleep(.1)
        self.assertEquals(len(get_status_messages(self.conn, 9)), 2)

        self.assertTrue(unfollow_user(self.conn, 1, 2))
        self.assertEquals(len(get_status_messages(self.conn, 1)), 0)

    def test_refill_timeline(self):
        self.assertEquals(create_user(self.conn, 'TestUser', 'Test User'), 1)
        self.assertEquals(create_user(self.conn, 'TestUser2', 'Test User2'), 2)
        self.assertEquals(create_user(self.conn, 'TestUser3', 'Test User3'), 3)
        
        self.assertTrue(follow_user(self.conn, 1, 2))
        self.assertTrue(follow_user(self.conn, 1, 3))

        global HOME_TIMELINE_SIZE
        HOME_TIMELINE_SIZE = 5
        
        for i in xrange(10):
            self.assertTrue(post_status(self.conn, 2, 'message'))
            self.assertTrue(post_status(self.conn, 3, 'message'))
            time.sleep(.05)

        self.assertEquals(len(get_status_messages(self.conn, 1)), 5)
        self.assertTrue(unfollow_user(self.conn, 1, 2))
        self.assertTrue(len(get_status_messages(self.conn, 1)) < 5)

        refill_timeline(self.conn, 'following:1', 'home:1')
        messages = get_status_messages(self.conn, 1)
        self.assertEquals(len(messages), 5)
        for msg in messages:
            self.assertEquals(msg['uid'], '3')
        
        delete_status(self.conn, '3', messages[-1]['id'])
        self.assertEquals(len(get_status_messages(self.conn, 1)), 4)
        self.assertEquals(self.conn.zcard('home:1'), 5)
        clean_timelines(self.conn, '3', messages[-1]['id'])
        self.assertEquals(self.conn.zcard('home:1'), 4)

if __name__ == '__main__':
    unittest.main()
